ComposableTimeSeriesForestRegressor (sktime-style TSF)

ComposableTimeSeriesForestRegressor (sktime-style TSF)#

The Time-Series Forest (TSF) is an interval-based ensemble: each tree trains on summary features computed over random intervals of the input series.

This notebook implements a ComposableTimeSeriesForestRegressor([…]) inspired by sktime’s TimeSeriesForestRegressor family:

  • You choose the summary functions (e.g., mean/std/slope) via a list.

  • Each tree samples random intervals, extracts features, and fits a DecisionTreeRegressor.

  • Predictions are the average across trees.

We’ll also show how to use it for one-step-ahead forecasting via a sliding-window reduction.

Goals#

  • Explain TSF intuition and why interval features work

  • Implement ComposableTimeSeriesForestRegressor([...]) in NumPy + scikit-learn

  • Demonstrate multi-seasonal forecasting via a windowed regression dataset

  • Plot forecast behavior and residual diagnostics with Plotly

Algorithm sketch#

Given training pairs \((X_i, y_i)\) where each \(X_i\) is a univariate (or multivariate) time series of fixed length \(m\).

For each tree \(b=1,\dots,B\):

  1. Sample \(K\) random intervals \([s_k, e_k)\) with \(0\le s_k < e_k \le m\).

  2. For each interval, compute a set of summary features \(f_1,\dots,f_F\) (e.g., mean/std/slope) on that segment.

  3. Concatenate all interval features into a feature vector \(\phi_b(X_i)\).

  4. Fit a decision tree regressor \(T_b\) on \((\phi_b(X_i), y_i)\).

Prediction is the ensemble average: $\(\hat{y}(X) = \frac{1}{B}\sum_{b=1}^{B} T_b(\phi_b(X)).\)$

Why it works (intuition): random intervals capture local patterns (level shifts, spikes, seasonal fragments, trend segments). Trees then learn which interval statistics matter.

import numpy as np
import pandas as pd

import plotly.graph_objects as go
from plotly.subplots import make_subplots
import os
import plotly.io as pio

from sklearn.tree import DecisionTreeRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

from scipy import stats

pio.renderers.default = os.environ.get("PLOTLY_RENDERER", "notebook")
pio.templates.default = "plotly_white"

rng = np.random.default_rng(7)

import numpy, pandas, sklearn, scipy, plotly
print("numpy:", numpy.__version__)
print("pandas:", pandas.__version__)
print("sklearn:", sklearn.__version__)
print("scipy:", scipy.__version__)
print("plotly:", plotly.__version__)
numpy: 1.26.2
pandas: 2.1.3
sklearn: 1.6.0
scipy: 1.15.0
plotly: 6.5.2
def _as_3d_panel(X: np.ndarray) -> np.ndarray:
    """Accept (n, m) or (n, d, m). Return (n, d, m)."""
    X = np.asarray(X, dtype=float)
    if X.ndim == 2:
        return X[:, None, :]
    if X.ndim == 3:
        return X
    raise ValueError(f"X must be 2D or 3D, got shape={X.shape}")


def _acf(x: np.ndarray, max_lag: int) -> tuple[np.ndarray, np.ndarray]:
    x = np.asarray(x, dtype=float)
    x = x - x.mean()
    denom = float(np.dot(x, x))
    lags = np.arange(max_lag + 1)
    values = np.zeros(max_lag + 1)
    values[0] = 1.0
    if denom == 0.0:
        return lags, values
    for k in range(1, max_lag + 1):
        values[k] = float(np.dot(x[k:], x[:-k]) / denom)
    return lags, values


def _feature_mean(seg2d: np.ndarray) -> np.ndarray:
    return np.mean(seg2d, axis=1)


def _feature_std(seg2d: np.ndarray) -> np.ndarray:
    return np.std(seg2d, axis=1, ddof=0)


def _feature_min(seg2d: np.ndarray) -> np.ndarray:
    return np.min(seg2d, axis=1)


def _feature_max(seg2d: np.ndarray) -> np.ndarray:
    return np.max(seg2d, axis=1)


def _feature_slope(seg2d: np.ndarray) -> np.ndarray:
    """Least-squares slope vs time index within the interval."""
    n = seg2d.shape[1]
    if n < 2:
        return np.zeros(seg2d.shape[0])
    t = np.arange(n, dtype=float)
    t = t - t.mean()
    denom = float(np.dot(t, t))
    y = seg2d - seg2d.mean(axis=1, keepdims=True)
    return (y @ t) / denom


_BUILTIN_FEATURES: dict[str, callable] = {
    "mean": _feature_mean,
    "std": _feature_std,
    "slope": _feature_slope,
    "min": _feature_min,
    "max": _feature_max,
}


def _resolve_feature_functions(feature_functions) -> list[tuple[str, callable]]:
    """Return list of (name, f(seg2d)->(n_samples,))."""
    if feature_functions is None:
        feature_functions = ["mean", "std", "slope"]

    resolved: list[tuple[str, callable]] = []
    for i, ff in enumerate(feature_functions):
        if isinstance(ff, str):
            if ff not in _BUILTIN_FEATURES:
                raise ValueError(f"Unknown feature '{ff}'. Available: {sorted(_BUILTIN_FEATURES)}")
            resolved.append((ff, _BUILTIN_FEATURES[ff]))
            continue

        if isinstance(ff, tuple) and len(ff) == 2 and isinstance(ff[0], str) and callable(ff[1]):
            name, func = ff
            resolved.append((name, func))
            continue

        if callable(ff):
            name = getattr(ff, "__name__", f"feature_{i}")

            def _wrapped(seg2d: np.ndarray, _f=ff) -> np.ndarray:
                return np.apply_along_axis(_f, 1, seg2d).astype(float)

            resolved.append((name, _wrapped))
            continue

        raise TypeError("feature_functions must contain str, callable, or (name, callable)")

    # make names unique
    seen: dict[str, int] = {}
    unique: list[tuple[str, callable]] = []
    for name, func in resolved:
        if name not in seen:
            seen[name] = 1
            unique.append((name, func))
        else:
            seen[name] += 1
            unique.append((f"{name}_{seen[name]}", func))
    return unique


def _random_intervals(
    n_timepoints: int,
    n_intervals: int,
    *,
    min_length: int,
    max_length: int | None,
    rng: np.random.Generator,
) -> list[tuple[int, int]]:
    n_timepoints = int(n_timepoints)
    min_length = int(min_length)
    if min_length < 2:
        min_length = 2

    if max_length is None:
        max_length = n_timepoints
    max_length = int(min(max_length, n_timepoints))

    if min_length > max_length:
        raise ValueError("min_length cannot exceed max_length")

    intervals: list[tuple[int, int]] = []
    for _ in range(int(n_intervals)):
        length = int(rng.integers(min_length, max_length + 1))
        start = int(rng.integers(0, n_timepoints - length + 1))
        end = start + length
        intervals.append((start, end))
    return intervals


def _transform_intervals(
    X3: np.ndarray,
    intervals: list[tuple[int, int]],
    feature_functions: list[tuple[str, callable]],
) -> np.ndarray:
    n, d, _m = X3.shape
    features: list[np.ndarray] = []
    for dim in range(d):
        Xd = X3[:, dim, :]
        for (start, end) in intervals:
            seg = Xd[:, start:end]
            for _name, func in feature_functions:
                features.append(func(seg).reshape(n, 1))
    if not features:
        return np.zeros((n, 0), dtype=float)
    return np.concatenate(features, axis=1)
class ComposableTimeSeriesForestRegressor:
    """sktime-style TSF regressor (interval features + tree ensemble).

    Parameters
    ----------
    feature_functions : list
        List of features to compute per interval.
        Each entry can be:
        - str in {"mean","std","slope","min","max"}
        - callable that maps 1D -> scalar
        - (name, callable) where callable maps seg2d -> vector (n_samples,)
    n_estimators : int
        Number of trees.
    n_intervals : int | "sqrt"
        Number of random intervals per tree. "sqrt" uses int(sqrt(m)).
    min_interval_length : int
        Minimum interval length.
    max_interval_length : int | None
        Maximum interval length (None -> full length).
    bootstrap : bool
        Bootstrap samples per tree.
    random_state : int | None
        Seed.
    tree_params : dict | None
        Passed to sklearn DecisionTreeRegressor.
    """

    def __init__(
        self,
        feature_functions=None,
        *,
        n_estimators: int = 200,
        n_intervals: int | str = "sqrt",
        min_interval_length: int = 3,
        max_interval_length: int | None = None,
        bootstrap: bool = True,
        random_state: int | None = None,
        tree_params: dict | None = None,
    ):
        self.feature_functions = feature_functions
        self.n_estimators = int(n_estimators)
        self.n_intervals = n_intervals
        self.min_interval_length = int(min_interval_length)
        self.max_interval_length = max_interval_length
        self.bootstrap = bool(bootstrap)
        self.random_state = random_state
        self.tree_params = {} if tree_params is None else dict(tree_params)

        self.feature_functions_: list[tuple[str, callable]] | None = None
        self.intervals_: list[list[tuple[int, int]]] | None = None
        self.estimators_: list[DecisionTreeRegressor] | None = None
        self.n_timepoints_: int | None = None
        self.n_dims_: int | None = None

    def fit(self, X, y):
        X3 = _as_3d_panel(X)
        y = np.asarray(y, dtype=float)
        if X3.shape[0] != y.shape[0]:
            raise ValueError("X and y must have the same number of samples")

        n, d, m = X3.shape
        self.n_timepoints_ = int(m)
        self.n_dims_ = int(d)
        self.feature_functions_ = _resolve_feature_functions(self.feature_functions)

        if isinstance(self.n_intervals, str):
            if self.n_intervals != "sqrt":
                raise ValueError("n_intervals must be int or 'sqrt'")
            n_intervals = max(1, int(np.sqrt(m)))
        else:
            n_intervals = max(1, int(self.n_intervals))

        rng = np.random.default_rng(self.random_state)
        self.estimators_ = []
        self.intervals_ = []

        for _ in range(self.n_estimators):
            seed = int(rng.integers(0, 2**32 - 1))
            r = np.random.default_rng(seed)

            intervals = _random_intervals(
                n_timepoints=m,
                n_intervals=n_intervals,
                min_length=self.min_interval_length,
                max_length=self.max_interval_length,
                rng=r,
            )
            Phi = _transform_intervals(X3, intervals, self.feature_functions_)

            if self.bootstrap:
                idx = r.integers(0, n, size=n)
                Phi_fit = Phi[idx]
                y_fit = y[idx]
            else:
                Phi_fit = Phi
                y_fit = y

            tree = DecisionTreeRegressor(random_state=seed, **self.tree_params)
            tree.fit(Phi_fit, y_fit)

            self.estimators_.append(tree)
            self.intervals_.append(intervals)

        return self

    def predict(self, X) -> np.ndarray:
        if self.estimators_ is None or self.intervals_ is None or self.feature_functions_ is None:
            raise RuntimeError("Call fit() before predict().")

        X3 = _as_3d_panel(X)
        n, d, m = X3.shape
        if m != self.n_timepoints_ or d != self.n_dims_:
            raise ValueError(
                f"X must have shape (n,{self.n_dims_},{self.n_timepoints_}) (or (n,{self.n_timepoints_})), got {X3.shape}"
            )

        preds = np.zeros(n, dtype=float)
        for tree, intervals in zip(self.estimators_, self.intervals_):
            Phi = _transform_intervals(X3, intervals, self.feature_functions_)
            preds += tree.predict(Phi)
        return preds / len(self.estimators_)

Demo: multi-seasonal one-step forecasting via sliding windows#

We generate a single time series with two seasonalities (weekly + ~monthly) and correlated noise.

Then we build a supervised dataset:

  • input: the last \(L\) observations

  • target: the next observation

This reduction turns forecasting into regression on fixed-length time series windows.

def simulate_ar1_noise(n: int, *, phi: float, sigma: float, rng: np.random.Generator) -> np.ndarray:
    eps = rng.normal(0.0, sigma, size=n)
    u = np.zeros(n)
    for t in range(1, n):
        u[t] = phi * u[t - 1] + eps[t]
    return u


n = 700
idx = pd.date_range("2022-01-01", periods=n, freq="D")
t = np.arange(n)

weekly = 2.0 * np.sin(2 * np.pi * t / 7) + 0.5 * np.cos(2 * np.pi * t / 7)
monthly = 1.3 * np.sin(2 * np.pi * t / 30) - 0.4 * np.cos(2 * np.pi * t / 30)
trend = 0.02 * t
noise = simulate_ar1_noise(n, phi=0.6, sigma=0.7, rng=rng)

y = 50.0 + trend + weekly + monthly + noise
y = pd.Series(y, index=idx, name="y")

fig = go.Figure()
fig.add_trace(go.Scatter(x=y.index, y=y, name="y", line=dict(color="black")))
fig.update_layout(title="Synthetic multi-seasonal series", xaxis_title="date", yaxis_title="value")
fig.show()
def make_sliding_windows(y: np.ndarray, window_length: int) -> tuple[np.ndarray, np.ndarray]:
    y = np.asarray(y, dtype=float)
    L = int(window_length)
    if L < 2:
        raise ValueError("window_length must be >= 2")
    if y.size <= L:
        raise ValueError("y is too short for the chosen window_length")

    X = np.column_stack([y[i : y.size - L + i] for i in range(L)])
    y_next = y[L:]
    return X, y_next


L = 60
X, y_next = make_sliding_windows(y.to_numpy(), window_length=L)

# time-aware split
h = 120
X_train, y_train = X[:-h], y_next[:-h]
X_test, y_test = X[-h:], y_next[-h:]
t_test = y.index[-h:]

tsf = ComposableTimeSeriesForestRegressor(
    ["mean", "std", "slope", "min", "max"],
    n_estimators=150,
    n_intervals="sqrt",
    min_interval_length=5,
    bootstrap=True,
    random_state=7,
    tree_params={"max_depth": 12, "min_samples_leaf": 3},
)

tsf.fit(X_train, y_train)
y_pred = tsf.predict(X_test)

mae = mean_absolute_error(y_test, y_pred)
rmse = mean_squared_error(y_test, y_pred, squared=False)
r2 = r2_score(y_test, y_pred)

print(f"MAE:  {mae:.3f}")
print(f"RMSE: {rmse:.3f}")
print(f"R^2:  {r2:.3f}")
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[5], line 37
     34 y_pred = tsf.predict(X_test)
     36 mae = mean_absolute_error(y_test, y_pred)
---> 37 rmse = mean_squared_error(y_test, y_pred, squared=False)
     38 r2 = r2_score(y_test, y_pred)
     40 print(f"MAE:  {mae:.3f}")

File ~/miniconda3/lib/python3.12/site-packages/sklearn/utils/_param_validation.py:194, in validate_params.<locals>.decorator.<locals>.wrapper(*args, **kwargs)
    191 func_sig = signature(func)
    193 # Map *args/**kwargs to the function signature
--> 194 params = func_sig.bind(*args, **kwargs)
    195 params.apply_defaults()
    197 # ignore self/cls and positional/keyword markers

File ~/miniconda3/lib/python3.12/inspect.py:3277, in Signature.bind(self, *args, **kwargs)
   3272 def bind(self, /, *args, **kwargs):
   3273     """Get a BoundArguments object, that maps the passed `args`
   3274     and `kwargs` to the function's signature.  Raises `TypeError`
   3275     if the passed arguments can not be bound.
   3276     """
-> 3277     return self._bind(args, kwargs)

File ~/miniconda3/lib/python3.12/inspect.py:3266, in Signature._bind(self, args, kwargs, partial)
   3256         raise TypeError(
   3257             'got some positional-only arguments passed as '
   3258             'keyword arguments: {arg!r}'.format(
   (...)   3263             ),
   3264         )
   3265     else:
-> 3266         raise TypeError(
   3267             'got an unexpected keyword argument {arg!r}'.format(
   3268                 arg=next(iter(kwargs))))
   3270 return self._bound_arguments_cls(self, arguments)

TypeError: got an unexpected keyword argument 'squared'
# One-step-ahead predictions over the test horizon
fig = go.Figure()
fig.add_trace(go.Scatter(x=t_test, y=y_test, name="actual", line=dict(color="black")))
fig.add_trace(go.Scatter(x=t_test, y=y_pred, name="pred (1-step)", line=dict(color="#4E79A7")))
fig.update_layout(title="TSF one-step-ahead predictions", xaxis_title="date", yaxis_title="value")
fig.show()

fig = go.Figure()
fig.add_trace(go.Scatter(x=y_test, y=y_pred, mode="markers", name="points", marker=dict(opacity=0.7)))
lo = min(float(np.min(y_test)), float(np.min(y_pred)))
hi = max(float(np.max(y_test)), float(np.max(y_pred)))
fig.add_trace(go.Scatter(x=[lo, hi], y=[lo, hi], mode="lines", name="y=x", line=dict(color="black", dash="dash")))
fig.update_layout(title="Actual vs predicted", xaxis_title="actual", yaxis_title="predicted")
fig.show()
def recursive_forecast(model, history: np.ndarray, steps: int, window_length: int) -> np.ndarray:
    history = np.asarray(history, dtype=float).tolist()
    preds = []
    for _ in range(int(steps)):
        x = np.asarray(history[-window_length:], dtype=float)[None, :]
        y_hat = float(model.predict(x)[0])
        preds.append(y_hat)
        history.append(y_hat)
    return np.asarray(preds)


# Compare recursive multi-step vs ground-truth over a shorter horizon
h2 = 45
start = len(y) - h
hist = y.to_numpy()[:start]
truth = y.to_numpy()[start : start + h2]
t_h2 = y.index[start : start + h2]

rec = recursive_forecast(tsf, history=hist, steps=h2, window_length=L)

fig = go.Figure()
fig.add_trace(go.Scatter(x=y.index[start - 120 : start], y=y.to_numpy()[start - 120 : start], name="history", line=dict(color="rgba(0,0,0,0.35)")))
fig.add_trace(go.Scatter(x=t_h2, y=truth, name="truth", line=dict(color="black")))
fig.add_trace(go.Scatter(x=t_h2, y=rec, name="recursive forecast", line=dict(color="#E15759")))
fig.update_layout(title="Recursive multi-step forecast (reduction)", xaxis_title="date", yaxis_title="value")
fig.show()
# Residual diagnostics (test horizon)
resid = y_test - y_pred
print("residual mean:", float(np.mean(resid)))
print("residual std:", float(np.std(resid, ddof=1)))
print("Jarque-Bera:", stats.jarque_bera(resid))

lags, acf_vals = _acf(resid, max_lag=30)
bound = 1.96 / np.sqrt(resid.size)

# QQ data
nq = resid.size
p = (np.arange(1, nq + 1) - 0.5) / nq
theoretical = stats.norm.ppf(p)
sample_q = np.sort((resid - resid.mean()) / resid.std(ddof=1))

fig = make_subplots(
    rows=2,
    cols=2,
    subplot_titles=("Residuals over time", "Residual histogram", "Residual ACF", "QQ plot (std residuals)"),
)

fig.add_trace(go.Scatter(x=t_test, y=resid, name="residuals", line=dict(color="#4E79A7")), row=1, col=1)
fig.add_hline(y=0, line=dict(color="black", dash="dash"), row=1, col=1)

fig.add_trace(go.Histogram(x=resid, nbinsx=25, name="hist", marker_color="#4E79A7"), row=1, col=2)

fig.add_trace(go.Bar(x=lags, y=acf_vals, name="ACF(resid)", marker_color="#4E79A7"), row=2, col=1)
fig.add_trace(go.Scatter(x=[0, lags.max()], y=[bound, bound], mode="lines", line=dict(color="gray", dash="dash"), showlegend=False), row=2, col=1)
fig.add_trace(go.Scatter(x=[0, lags.max()], y=[-bound, -bound], mode="lines", line=dict(color="gray", dash="dash"), showlegend=False), row=2, col=1)

fig.add_trace(go.Scatter(x=theoretical, y=sample_q, mode="markers", name="QQ", marker=dict(color="#4E79A7")), row=2, col=2)
fig.add_trace(
    go.Scatter(x=[theoretical.min(), theoretical.max()], y=[theoretical.min(), theoretical.max()], mode="lines", line=dict(color="black", dash="dash"), showlegend=False),
    row=2,
    col=2,
)

fig.update_layout(height=750, title="Residual diagnostics (test horizon)")
fig.show()
residual mean: 2.022139488149232
residual std: 1.7655684826389506
Jarque-Bera: SignificanceResult(statistic=2.147517772919136, pvalue=0.3417216075363839)
# Visual intuition: show random intervals used by one tree on the last input window
tree_idx = 0
intervals = tsf.intervals_[tree_idx]

window = X_test[-1]
x_axis = np.arange(window.size)

fig = go.Figure()
fig.add_trace(go.Scatter(x=x_axis, y=window, name="window", line=dict(color="black")))

for (s, e) in intervals[:12]:
    fig.add_vrect(x0=s, x1=e - 1, fillcolor="rgba(78,121,167,0.10)", line_width=0)

fig.update_layout(
    title=f"Example random intervals (tree {tree_idx}) over one input window",
    xaxis_title="lag index in window",
    yaxis_title="value",
)
fig.show()